본문으로 건너뛰기

데이터베이스 연결과 SQL 활용

데이터베이스는 구조화된 데이터를 저장하고 관리하는 핵심 시스템이다. 대부분의 조직에서 중요한 데이터는 데이터베이스에 저장되어 있으며, 이러한 데이터에 접근하고 활용하는 능력은 데이터 분석가에게 필수적이다.

SQL 기초와 데이터 추출

SQL(Structured Query Language)은 데이터베이스와 소통하는 표준 언어다. 데이터 분석가라면 기본적인 SQL 문법은 반드시 알아야 한다.

기본 SELECT 문과 조건 필터링

-- 기본 SELECT 문
SELECT * FROM customers;

-- 특정 컬럼만 선택
SELECT customer_id, customer_name, city FROM customers;

-- 조건 필터링
SELECT * FROM customers
WHERE city = '서울';

-- 여러 조건 사용
SELECT * FROM customers
WHERE city = '서울' AND age >= 30;

-- 범위 조건
SELECT * FROM sales
WHERE sale_date BETWEEN '2023-01-01' AND '2023-12-31';

-- 패턴 매칭
SELECT * FROM customers
WHERE customer_name LIKE '김%';

JOIN을 활용한 테이블 결합

-- INNER JOIN
SELECT c.customer_name, s.product_name, s.quantity
FROM customers c
INNER JOIN sales s ON c.customer_id = s.customer_id;

-- LEFT JOIN
SELECT c.customer_name, s.product_name, s.quantity
FROM customers c
LEFT JOIN sales s ON c.customer_id = s.customer_id;

-- 여러 테이블 조인
SELECT c.customer_name, p.product_name, s.quantity, s.sale_date
FROM customers c
INNER JOIN sales s ON c.customer_id = s.customer_id
INNER JOIN products p ON s.product_id = p.product_id;

집계 함수와 GROUP BY 활용

-- 기본 집계 함수
SELECT COUNT(*) as total_customers FROM customers;
SELECT AVG(age) as average_age FROM customers;
SELECT SUM(quantity) as total_quantity FROM sales;

-- GROUP BY와 함께 사용
SELECT city, COUNT(*) as customer_count
FROM customers
GROUP BY city;

-- HAVING 절로 그룹 필터링
SELECT city, COUNT(*) as customer_count
FROM customers
GROUP BY city
HAVING COUNT(*) >= 10;

-- 복합 집계
SELECT
city,
COUNT(*) as customer_count,
AVG(age) as average_age,
MIN(age) as min_age,
MAX(age) as max_age
FROM customers
GROUP BY city;

Python에서 데이터베이스 연결

Python에서 다양한 데이터베이스에 연결하는 방법을 알아보자.

SQLite 로컬 데이터베이스 활용

SQLite는 파일 기반의 경량 데이터베이스로, 별도의 서버 설치 없이 사용할 수 있다.

import sqlite3
import pandas as pd

# SQLite 데이터베이스 연결
conn = sqlite3.connect('sample.db')

# 테이블 생성 및 데이터 삽입
cursor = conn.cursor()

# 테이블 생성
cursor.execute('''
CREATE TABLE IF NOT EXISTS customers (
customer_id INTEGER PRIMARY KEY,
customer_name TEXT NOT NULL,
city TEXT,
age INTEGER
)
''')

# 샘플 데이터 삽입
sample_data = [
(1, '김철수', '서울', 30),
(2, '이영희', '부산', 25),
(3, '박민수', '서울', 35),
(4, '최지은', '대구', 28)
]

cursor.executemany(
'INSERT OR REPLACE INTO customers VALUES (?, ?, ?, ?)',
sample_data
)

conn.commit()

# pandas로 데이터 읽기
df = pd.read_sql_query("SELECT * FROM customers", conn)
print(df)

# 연결 종료
conn.close()

DuckDB 로컬 데이터베이스 활용

DuckDB는 분석 작업에 최적화된 고성능 분석용 데이터베이스다.

import duckdb
import pandas as pd

# DuckDB 연결
conn = duckdb.connect('analytics.db')

# CSV 파일을 직접 쿼리
result = conn.execute("""
SELECT city, COUNT(*) as count
FROM read_csv_auto('customers.csv')
GROUP BY city
""").fetchdf()

print(result)

# pandas DataFrame을 DuckDB에서 직접 사용
df = pd.DataFrame({
'name': ['김철수', '이영희', '박민수'],
'age': [30, 25, 35],
'city': ['서울', '부산', '서울']
})

# DataFrame을 DuckDB에서 쿼리
result = conn.execute("""
SELECT city, AVG(age) as avg_age
FROM df
GROUP BY city
""").fetchdf()

print(result)

conn.close()

MySQL/PostgreSQL 연결 (SQLAlchemy)

from sqlalchemy import create_engine
import pandas as pd

# MySQL 연결
mysql_engine = create_engine('mysql+pymysql://user:password@localhost/database')

# PostgreSQL 연결
postgres_engine = create_engine('postgresql://user:password@localhost/database')

# 데이터 읽기
df = pd.read_sql_query("""
SELECT customer_id, customer_name, city
FROM customers
WHERE city = '서울'
""", mysql_engine)

# 데이터 쓰기
df.to_sql('new_table', mysql_engine, if_exists='replace', index=False)

연결 풀링과 성능 최적화

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# 연결 풀 설정
engine = create_engine(
'mysql+pymysql://user:password@localhost/database',
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True # 연결 상태 확인
)

# 컨텍스트 매니저 사용
def safe_db_operation(query):
with engine.connect() as conn:
result = pd.read_sql_query(query, conn)
return result

# 사용 예시
df = safe_db_operation("SELECT * FROM customers LIMIT 100")

대용량 데이터 처리

청크 단위 데이터 로딩

def process_large_table(engine, table_name, chunk_size=10000):
"""대용량 테이블을 청크 단위로 처리"""

# 전체 레코드 수 확인
total_count = pd.read_sql_query(
f"SELECT COUNT(*) as count FROM {table_name}",
engine
).iloc[0]['count']

print(f"전체 레코드 수: {total_count}")

# 청크 단위로 처리
processed_data = []

for offset in range(0, total_count, chunk_size):
chunk_query = f"""
SELECT * FROM {table_name}
LIMIT {chunk_size} OFFSET {offset}
"""

chunk_df = pd.read_sql_query(chunk_query, engine)

# 각 청크에 대한 처리
processed_chunk = chunk_df.groupby('category').agg({
'value': 'sum',
'count': 'size'
})

processed_data.append(processed_chunk)

print(f"처리 완료: {offset + len(chunk_df)}/{total_count}")

# 결과 병합
final_result = pd.concat(processed_data).groupby(level=0).sum()
return final_result

메모리 사용량 모니터링

import psutil
import pandas as pd

def monitor_memory_usage(func):
"""메모리 사용량을 모니터링하는 데코레이터"""
def wrapper(*args, **kwargs):
# 시작 시 메모리 사용량
process = psutil.Process()
start_memory = process.memory_info().rss / 1024 / 1024 # MB

print(f"시작 메모리 사용량: {start_memory:.2f} MB")

# 함수 실행
result = func(*args, **kwargs)

# 종료 시 메모리 사용량
end_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_diff = end_memory - start_memory

print(f"종료 메모리 사용량: {end_memory:.2f} MB")
print(f"메모리 증가량: {memory_diff:.2f} MB")

return result

return wrapper

@monitor_memory_usage
def load_large_dataset(engine, query):
return pd.read_sql_query(query, engine)

인덱스 활용과 쿼리 최적화

def optimize_query_performance(engine):
"""쿼리 성능 최적화 예시"""

# 인덱스 생성
with engine.connect() as conn:
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_customer_city
ON customers(city)
""")

conn.execute("""
CREATE INDEX IF NOT EXISTS idx_sales_date
ON sales(sale_date)
""")

conn.commit()

# 최적화된 쿼리 작성
optimized_query = """
SELECT
c.city,
COUNT(*) as customer_count,
SUM(s.amount) as total_sales
FROM customers c
INNER JOIN sales s ON c.customer_id = s.customer_id
WHERE s.sale_date >= '2023-01-01'
GROUP BY c.city
HAVING COUNT(*) >= 5
ORDER BY total_sales DESC
"""

return pd.read_sql_query(optimized_query, engine)

실무에서 자주 사용하는 SQL 패턴

시계열 데이터 분석

-- 월별 매출 추이
SELECT
DATE_FORMAT(sale_date, '%Y-%m') as month,
SUM(amount) as monthly_sales,
COUNT(*) as transaction_count
FROM sales
WHERE sale_date >= '2023-01-01'
GROUP BY DATE_FORMAT(sale_date, '%Y-%m')
ORDER BY month;

-- 이동 평균 계산
SELECT
sale_date,
amount,
AVG(amount) OVER (
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7days
FROM sales
ORDER BY sale_date;

고객 세분화

-- RFM 분석을 위한 고객 세분화
SELECT
customer_id,
DATEDIFF(CURDATE(), MAX(sale_date)) as recency,
COUNT(*) as frequency,
SUM(amount) as monetary
FROM sales
GROUP BY customer_id;

상위 N개 항목 조회

-- 카테고리별 상위 3개 제품
SELECT *
FROM (
SELECT
category,
product_name,
sales_amount,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales_amount DESC) as rn
FROM product_sales
) ranked
WHERE rn <= 3;

Python과 SQL 연계 활용

동적 쿼리 생성

def generate_dynamic_query(filters):
"""동적으로 SQL 쿼리 생성"""
base_query = "SELECT * FROM customers WHERE 1=1"

if filters.get('city'):
base_query += f" AND city = '{filters['city']}'"

if filters.get('min_age'):
base_query += f" AND age >= {filters['min_age']}"

if filters.get('max_age'):
base_query += f" AND age <= {filters['max_age']}"

return base_query

# 사용 예시
filters = {'city': '서울', 'min_age': 25}
query = generate_dynamic_query(filters)
df = pd.read_sql_query(query, engine)

데이터 검증 및 품질 체크

def validate_data_quality(engine, table_name):
"""데이터 품질 검증"""

quality_checks = {}

# 1. 중복 레코드 확인
duplicate_query = f"""
SELECT COUNT(*) - COUNT(DISTINCT customer_id) as duplicates
FROM {table_name}
"""
quality_checks['duplicates'] = pd.read_sql_query(duplicate_query, engine).iloc[0]['duplicates']

# 2. 결측치 확인
null_query = f"""
SELECT
SUM(CASE WHEN customer_name IS NULL THEN 1 ELSE 0 END) as null_names,
SUM(CASE WHEN city IS NULL THEN 1 ELSE 0 END) as null_cities,
SUM(CASE WHEN age IS NULL THEN 1 ELSE 0 END) as null_ages
FROM {table_name}
"""
null_counts = pd.read_sql_query(null_query, engine).iloc[0]
quality_checks['null_counts'] = null_counts.to_dict()

# 3. 데이터 범위 확인
range_query = f"""
SELECT
MIN(age) as min_age,
MAX(age) as max_age,
COUNT(DISTINCT city) as unique_cities
FROM {table_name}
"""
ranges = pd.read_sql_query(range_query, engine).iloc[0]
quality_checks['ranges'] = ranges.to_dict()

return quality_checks

# 사용 예시
quality_report = validate_data_quality(engine, 'customers')
print(quality_report)

정리

데이터베이스 연결과 SQL 활용은 데이터 분석가의 핵심 역량이다. 기본적인 SQL 문법부터 Python과의 연계, 대용량 데이터 처리까지 다양한 기법을 익혀두면 실무에서 큰 도움이 된다. 특히 데이터 품질 검증과 성능 최적화는 실제 프로젝트에서 매우 중요한 부분이므로 충분히 연습해두는 것이 좋다.

다음 섹션에서는 웹 데이터 수집에 대해 알아보겠다.